test: unit tests for ExecutionMetrics and row count pipeline#72
test: unit tests for ExecutionMetrics and row count pipeline#72abhizipstack wants to merge 2 commits intofeat/run-history-redesignfrom
Conversation
21 tests covering: - _get_row_count_safe: success, exception fallback, zero count - BaseModel.execute(): routing per materialization (TABLE/VIEW/EPHEMERAL/ INCREMENTAL), upsert metrics threading, partial metrics, COUNT failure - Per-adapter _upsert_metrics: Postgres (merge + append), Snowflake, Databricks, Trino, BigQuery (skipped if package missing), DuckDB (skipped) - Celery result JSON aggregation: mixed types, all-None → None not 0, class name cleaning, failed model counting Also adds tests/unit/conftest.py with minimal Django settings for local runs (CI uses pytest-django with DJANGO_SETTINGS_MODULE from workflow env). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
| Filename | Overview |
|---|---|
| tests/unit/conftest.py | Minimal Django setup for local test runs; missing django.setup() (already flagged in prior review) unlike the backend counterpart which correctly includes it |
| tests/unit/test_execution_metrics.py | 391-line condensed suite; per-adapter merge tests don't patch fire_event (flagged in prior review for Postgres); TestCeleryResultAggregation replicates production aggregation logic instead of calling it directly (flagged in prior review) |
| backend/tests/unit_tests/conftest.py | Correctly calls django.setup() inside the configure block; structurally mirrors tests/unit/conftest.py but is the more complete version |
| backend/tests/unit_tests/test_execution_metrics.py | 808-line comprehensive suite; TestUpsertReturnValues class (lines ~338-418) tests only Mock return values, not real adapter code; TestCeleryResultJson._build_result_json replicates production aggregation logic |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[BaseModel.execute] --> B{materialization}
B -->|EPHEMERAL| C[execute_ephemeral\nrows_affected=None]
B -->|TABLE| D[execute_table\n+ _get_row_count_safe\nrows_inserted=rows]
B -->|VIEW| E[execute_view\nrows_affected=None]
B -->|INCREMENTAL| F[execute_incremental\n+ _get_row_count_safe]
F --> G{_upsert_metrics set?}
G -->|Yes| H[rows_inserted/updated/deleted\nfrom adapter dict]
G -->|No| I[rows_inserted/updated/deleted=None]
H --> J[ExecutionMetrics]
I --> J
C --> J
D --> J
E --> J
Prompt To Fix All With AI
This is a comment left during a code review.
Path: backend/tests/unit_tests/test_execution_metrics.py
Line: 338-418
Comment:
**`TestUpsertReturnValues` only tests Mock configuration, not real adapter code**
Every test in this class creates a `Mock()` connection, sets `conn.upsert_into_table.return_value = {...}`, calls `conn.upsert_into_table(...)`, and asserts the configured value is returned. That verifies Python's `unittest.mock` works, not any production adapter logic. The class docstring acknowledges "we mock at the connection level rather than instantiating real connections," but the result is zero regression coverage for the actual upsert methods.
The per-adapter model tests in section 6 (e.g. `TestPostgresModelUpsertMetrics`) do instantiate real adapter classes and are the ones that provide actual coverage. Consider removing or replacing this section with tests that at least import and exercise the real connection code (similar to how the section-6 tests do).
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/tests/unit_tests/conftest.py
Line: 1-15
Comment:
**Duplicate test tree may cause coverage drift**
This PR places tests in two separate locations — `tests/unit/` (391 lines) and `backend/tests/unit_tests/` (808 lines) — with overlapping but non-identical content. The PR description's run instructions only reference `tests/unit/`, and `backend/tests/unit_tests/` contains additional test classes (`TestUpsertReturnValues`, `TestAdapterRunModel`, `TestExecuteGraphMetrics`, `TestCeleryResultJson`) not present in the other directory.
Having two canonical homes for the same feature's tests risks the two copies diverging silently as the codebase evolves. It also makes it unclear which suite CI runs. Consolidating into one location (likely `tests/unit/` per the run instructions) and clearly documenting the CI invocation would eliminate the ambiguity.
How can I resolve this? If you propose a fix, please make it concise.Reviews (2): Last reviewed commit: "fix: address Greptile review — django.se..." | Re-trigger Greptile
|
|
||
| def test_incremental_no_upsert_metrics(self): | ||
| pytest.importorskip("duckdb", reason="duckdb not installed") | ||
| from visitran.adapters.duckdb.model import DuckDbModel | ||
|
|
||
| conn = Mock() | ||
| db_model = DuckDbModel(conn, _make_incremental_model()) | ||
| db_model.execute_incremental() | ||
|
|
||
| assert db_model._upsert_metrics is None | ||
|
|
||
|
|
||
| # ============================================================================ | ||
| # Celery result JSON aggregation | ||
| # ============================================================================ | ||
|
|
||
| class TestCeleryResultAggregation: | ||
| """Test the aggregation logic replicated from celery_tasks.py.""" | ||
|
|
||
| def _aggregate(self, user_results): | ||
| def _clean_name(n): | ||
| if "'" in n: | ||
| return n.split("'")[1].split(".")[-1] | ||
| return n | ||
|
|
||
| return { | ||
| "models": [ | ||
| { | ||
| "name": _clean_name(r.node_name), | ||
| "rows_affected": getattr(r, "rows_affected", None), | ||
| "rows_inserted": getattr(r, "rows_inserted", None), | ||
| "rows_updated": getattr(r, "rows_updated", None), | ||
| "rows_deleted": getattr(r, "rows_deleted", None), | ||
| "type": getattr(r, "materialization", "") or "", | ||
| "duration_ms": getattr(r, "duration_ms", None), | ||
| } | ||
| for r in user_results | ||
| ], | ||
| "total": len(user_results), | ||
| "passed": sum(1 for r in user_results if r.end_status == "OK"), | ||
| "failed": sum(1 for r in user_results if r.end_status == "FAIL"), | ||
| "rows_processed": sum(getattr(r, "rows_affected", 0) or 0 for r in user_results) or None, | ||
| "rows_added": sum(getattr(r, "rows_inserted", 0) or 0 for r in user_results) or None, | ||
| "rows_modified": sum(getattr(r, "rows_updated", 0) or 0 for r in user_results) or None, | ||
| "rows_deleted": sum(getattr(r, "rows_deleted", 0) or 0 for r in user_results) or None, | ||
| } | ||
|
|
||
| def _result(self, **kwargs): | ||
| defaults = dict( | ||
| node_name="model", status="SUCCESS", info_message="", | ||
| failures=False, ending_time=datetime.datetime.now(), | ||
| sequence_num=1, end_status="OK", | ||
| ) | ||
| defaults.update(kwargs) | ||
| return BaseResult(**defaults) | ||
|
|
||
| def test_mixed_table_and_incremental(self): | ||
| results = [ | ||
| self._result(rows_affected=500, rows_inserted=500, rows_updated=0, rows_deleted=0, materialization="table"), | ||
| self._result(rows_affected=200, rows_inserted=100, rows_updated=80, rows_deleted=20, materialization="incremental", sequence_num=2), | ||
| ] | ||
| j = self._aggregate(results) | ||
|
|
||
| assert j["rows_processed"] == 700 | ||
| assert j["rows_added"] == 600 | ||
| assert j["rows_modified"] == 80 | ||
| assert j["rows_deleted"] == 20 | ||
| assert j["passed"] == 2 | ||
|
|
||
| def test_all_none_views_returns_none_not_zero(self): | ||
| results = [ | ||
| self._result(rows_affected=None, materialization="view"), | ||
| self._result(rows_affected=None, materialization="ephemeral", sequence_num=2), | ||
| ] | ||
| j = self._aggregate(results) | ||
|
|
||
| assert j["rows_processed"] is None | ||
| assert j["rows_added"] is None | ||
|
|
||
| def test_class_name_cleaned(self): | ||
| r = self._result(node_name="<class 'project.models.stg_orders.StgOrders'>") | ||
| j = self._aggregate([r]) | ||
|
|
||
| assert j["models"][0]["name"] == "StgOrders" | ||
|
|
||
| def test_failed_model_counted(self): | ||
| results = [ | ||
| self._result(rows_affected=100, rows_inserted=100), | ||
| self._result(node_name="bad", status="ERROR", failures=True, end_status="FAIL", sequence_num=2), | ||
| ] | ||
| j = self._aggregate(results) | ||
|
|
||
| assert j["passed"] == 1 | ||
| assert j["failed"] == 1 | ||
| assert j["rows_processed"] == 100 |
There was a problem hiding this comment.
Tests exercise a copy of production logic, not the real code
TestCeleryResultAggregation._aggregate replicates the aggregation from celery_tasks.py line 357–388 rather than calling the actual function. This means regressions in the real code will silently pass these tests.
The copy also diverges from production in two ways that leave real behaviors untested: (1) it omits the "Source" model filter (user_results = [r for r in ... if not _clean_name(r.node_name).startswith("Source")]), so there is no coverage of that exclusion logic; (2) each model dict in the production output also contains "status", "end_status", and "sequence" fields that are absent from the test's local copy. Consider extracting the pure aggregation logic from celery_tasks.py into a standalone function and importing it directly in the test, which would make the tests both accurate and regression-proof.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/test_execution_metrics.py
Line: 297-391
Comment:
**Tests exercise a copy of production logic, not the real code**
`TestCeleryResultAggregation._aggregate` replicates the aggregation from `celery_tasks.py` line 357–388 rather than calling the actual function. This means regressions in the real code will silently pass these tests.
The copy also diverges from production in two ways that leave real behaviors untested: (1) it omits the `"Source"` model filter (`user_results = [r for r in ... if not _clean_name(r.node_name).startswith("Source")]`), so there is no coverage of that exclusion logic; (2) each model dict in the production output also contains `"status"`, `"end_status"`, and `"sequence"` fields that are absent from the test's local copy. Consider extracting the pure aggregation logic from `celery_tasks.py` into a standalone function and importing it directly in the test, which would make the tests both accurate and regression-proof.
How can I resolve this? If you propose a fix, please make it concise.|
|
||
| conn = Mock() | ||
| model = _make_incremental_model() | ||
| model.primary_key = None | ||
|
|
||
| db_model = PostgresModel(conn, model) | ||
| db_model._has_schema_changed = Mock(return_value=False) | ||
| db_model.execute_incremental() | ||
|
|
||
| assert db_model._upsert_metrics is None | ||
|
|
||
|
|
||
| class TestSnowflakeModelMetrics: | ||
|
|
||
| def test_merge_sets_upsert_metrics(self): | ||
| from visitran.adapters.snowflake.model import SnowflakeModel | ||
|
|
||
| conn = Mock() | ||
| conn.upsert_into_table.return_value = {"rows_affected": 88} | ||
|
|
||
| db_model = SnowflakeModel(conn, _make_incremental_model()) | ||
| db_model._has_schema_changed = Mock(return_value=False) | ||
| db_model.execute_incremental() | ||
|
|
||
| assert db_model._upsert_metrics == {"rows_affected": 88} | ||
|
|
||
|
|
||
| class TestDatabricksModelMetrics: | ||
|
|
||
| def test_merge_sets_upsert_metrics(self): | ||
| from visitran.adapters.databricks.model import DatabricksModel | ||
|
|
There was a problem hiding this comment.
Postgres merge test path hides fire_event side-effect
test_merge_sets_upsert_metrics mocks _has_schema_changed but leaves fire_event(ExecuteIncrementalUpdate(...)) unpatched. On a CI worker where visitran.events.functions.fire_event appends to the global BASE_RESULT list or performs I/O, repeated test runs on the same process could accumulate stale entries or trigger unexpected behaviour. Patching visitran.adapters.postgres.model.fire_event (or clearing BASE_RESULT in a teardown) would isolate the test properly.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/test_execution_metrics.py
Line: 219-250
Comment:
**Postgres merge test path hides fire_event side-effect**
`test_merge_sets_upsert_metrics` mocks `_has_schema_changed` but leaves `fire_event(ExecuteIncrementalUpdate(...))` unpatched. On a CI worker where `visitran.events.functions.fire_event` appends to the global `BASE_RESULT` list or performs I/O, repeated test runs on the same process could accumulate stale entries or trigger unexpected behaviour. Patching `visitran.adapters.postgres.model.fire_event` (or clearing `BASE_RESULT` in a teardown) would isolate the test properly.
How can I resolve this? If you propose a fix, please make it concise.| import django.conf | ||
|
|
||
| if not django.conf.settings.configured: | ||
| django.conf.settings.configure( | ||
| GS_BUCKET_NAME="test-bucket", | ||
| CELERY_BROKER_URL="memory://", | ||
| DATABASES={}, | ||
| INSTALLED_APPS=[], | ||
| DEFAULT_AUTO_FIELD="django.db.models.BigAutoField", | ||
| SECRET_KEY="test-secret-key", | ||
| CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}}, | ||
| ) |
There was a problem hiding this comment.
Missing
django.setup() call may cause AppRegistryNotReady locally
django.conf.settings.configure(...) initialises the settings object but does not set up the Django application registry. Any import that triggers model loading or AppConfig.ready() (including some signal registrations in visitran) can raise django.core.exceptions.AppRegistryNotReady. Adding import django; django.setup() immediately after the if not settings.configured: block (matching what pytest-django does in CI) prevents this class of intermittent local failures.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/conftest.py
Line: 6-17
Comment:
**Missing `django.setup()` call may cause `AppRegistryNotReady` locally**
`django.conf.settings.configure(...)` initialises the settings object but does not set up the Django application registry. Any import that triggers model loading or `AppConfig.ready()` (including some signal registrations in `visitran`) can raise `django.core.exceptions.AppRegistryNotReady`. Adding `import django; django.setup()` immediately after the `if not settings.configured:` block (matching what `pytest-django` does in CI) prevents this class of intermittent local failures.
How can I resolve this? If you propose a fix, please make it concise.… note - Add django.setup() after settings.configure() in conftest.py to prevent AppRegistryNotReady errors (Greptile P2) - Patch fire_event in TestPostgresModelUpsertMetrics to prevent side-effects from event system during test execution (Greptile P2) - Add NOTE explaining that TestCeleryResultJson replicates production logic (cannot import — nested function). Add TODO for extraction. (Greptile P1 — acknowledged, tracked for future refactor) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
What
tests/unit/conftest.pyfor local Django setup (CI uses pytest-django)Why
How
Tests are organized by layer:
BigQuery and DuckDB tests are skipped via
pytest.importorskipwhen optional packages are not installed locally — they will run in CI where all dependencies are available.Can this PR break any existing features. If yes, please list possible items. If no, please explain why.
No. This PR only adds test files — no production code changes.
Database Migrations
None
Env Config
None
Relevant Docs
Related Issues or PRs
Dependencies Versions
No changes
Notes on Testing
Expected: 19 passed, 2 skipped (BigQuery + DuckDB packages not installed locally)
Screenshots
Checklist
I have read and understood the Contribution Guidelines.